﻿using Google.Protobuf;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Actor.Net.Network.Gate
{
    /// <summary>
    /// 通讯管道上下文
    /// </summary>
    public class GateChannelContext
    {
        /// <summary>
        /// 通讯管道
        /// </summary>
        public GateBaseChannel Channel { get; }
        /// <summary>
        /// 拥有此管道连接业务拥有者Id，比如PlayerId代表该连接属于某个玩家的长链接。
        /// </summary>
        public long OwnerId => this.Channel.OwnerId;
        /// <summary>
        /// Network Id
        /// </summary>
        public int NetworkId => this.Channel.NetworkId;
        /// <summary>
        /// 通讯管道Id
        /// </summary>
        public int ChannelId => Channel.ChannelId;
        /// <summary>
        /// 连接状态
        /// </summary>
        public bool Connected => Channel.Connected;
        /// <summary>
        /// 发送缓冲区字节流
        /// </summary>
        private MemoryStream SendStream { get; } = new MemoryStream(8192);
        /// <summary>
        /// RPC调用上下文列表字典
        /// </summary>
        private ConcurrentDictionary<Type, ConcurrentQueue<IGateCallContext>> CallContextCache { get; } = new ConcurrentDictionary<Type, ConcurrentQueue<IGateCallContext>>();
        /// <summary>
        /// 发送线程锁
        /// </summary>
        private object locker = new object();

        /// <summary>
        /// 构造函数
        /// </summary>
        /// <param name="channel">通讯管道</param>
        public GateChannelContext(GateBaseChannel channel)
        {
            this.Channel = channel;
        }

        /// <summary>
        /// 重连
        /// </summary>
        /// <param name="retryCount"></param>
        public void Reconnect(int retryCount = 0)
        {
            this.Channel.Reconnect(retryCount);
        }

        /// <summary>
        /// 发送消息
        /// </summary>
        /// <exception cref="SocketException">Socket异常</exception>
        /// <param name="message">发送内容</param>
        /// <param name="command">消息指令</param>
        /// <returns>返回发送状态</returns>
        public void Send<TMessage>(TMessage message, Enum command)
        {
            this.Send(message, Convert.ToUInt16(command), 0);
        }

        /// <summary>
        /// 发送消息
        /// </summary>
        /// <exception cref="SocketException">Socket异常</exception>
        /// <param name="message">发送内容</param>
        /// <param name="command">消息指令</param>
        /// <returns>返回发送状态</returns>
        public void Send<TMessage>(TMessage message, ushort command)
        {
            this.Send(message, command, 0);
        }

        /// <summary>
        /// 请求消息
        /// </summary>
        /// <exception cref="SocketException">Socket异常</exception>
        /// <typeparam name="TResult">返回结果类型</typeparam>
        /// <param name="message">发送消息</param>
        /// <param name="command">消息指令</param>
        /// <param name="cancellationToken">取消令牌</param>
        /// <returns>返回消息结果</returns>
        public async Task<TResult> CallAsync<TResult>(object message, Enum command, CancellationToken cancellationToken = default)
            where TResult : new()
        {
            return await this.CallAsync<TResult>(message, Convert.ToUInt16(command), cancellationToken);
        }

        /// <summary>
        /// 请求消息
        /// </summary>
        /// <exception cref="SocketException">Socket异常</exception>
        /// <typeparam name="TResult">返回结果类型</typeparam>
        /// <param name="message">发送消息</param>
        /// <param name="command">消息指令</param>
        /// <param name="cancellationToken">取消令牌</param>
        /// <returns>返回消息结果</returns>
        public async Task<TResult> CallAsync<TResult>(object message, ushort command, CancellationToken cancellationToken = default)
            where TResult : new()
        {
            GateCallContext<TResult> callContext = this.DequeueCallContext<TResult>();
            try
            {
                int rpcId = this.CreateIncId();
                this.Send(message, command, rpcId);

                var tcs = new TaskCompletionSource<TResult>();
                callContext.SetContext(this.Channel, rpcId, tcs);
                this.Channel.CallContextList.TryAdd(rpcId, callContext);
                cancellationToken.Register(() =>
                {
                    if (!tcs.Task.IsCompleted)
                        tcs.TrySetResult(default);
                });
                return await tcs.Task;
            }
            finally
            {
                this.EnqueueCallContext(callContext);
            }
        }

        /// <summary>
        /// 发送应答消息
        /// </summary>
        /// <exception cref="SocketException">Socket异常</exception>
        /// <param name="message">发送消息内容</param>
        /// <param name="command">消息指令</param>
        /// <param name="rpcId">Rpc调用Id</param>
        /// <returns>返回发送状态</returns>
        internal void Response<TMessage>(TMessage message, ushort command, int rpcId)
        {
            this.Send(message, command, rpcId);
        }

        private void Send(object message, ushort command, int rpcId)
        {
            var sendStream = this.SendStream;
            lock (locker)
            {
                try
                {
                    this.WriteHeadBytes(sendStream, 0, command, rpcId);
                    if (message != null)
                    {
                        var type = message.GetType();
                        if (type == typeof(string))
                        {
                            var str = message as string;
                            var bytes = Encoding.UTF8.GetBytes(str);
                            sendStream.Write(bytes, 0, bytes.Length);
                        }
                        else if (type == typeof(byte[]))
                        {
                            var bytes = message as byte[];
                            sendStream.Write(bytes, 0, bytes.Length);
                        }
                        else
                        {
                            if (this.Channel.Network.SerializationType == SerializationType.Protobuf)
                            {
                                ((IMessage)message).WriteTo(sendStream);
                            }
                            else if (this.Channel.Network.SerializationType == SerializationType.MessagePack)
                            {
                                MessagePack.MessagePackSerializer.Serialize(message.GetType(), sendStream, message);
                            }
                        }
                    }
                    var length = (ushort)sendStream.Position;
                    this.WriteLength(sendStream, length);

                    var sendBuffer = sendStream.GetBuffer();
                    this.Channel.Write(sendBuffer, length);
                }
                finally
                {
                    sendStream.Seek(0, SeekOrigin.Begin);
                    sendStream.SetLength(0);
                }
            }            
        }

        private GateCallContext<TResult> DequeueCallContext<TResult>()
            where TResult : new()
        {
            var type = typeof(GateCallContext<TResult>);
            if(!this.CallContextCache.TryGetValue(type, out ConcurrentQueue<IGateCallContext> queue))
            {
                queue = new ConcurrentQueue<IGateCallContext>();
                this.CallContextCache.AddOrUpdate(type, queue, (k, v) => queue);
            }

            if(!queue.TryDequeue(out IGateCallContext context))
            {
                context = new GateCallContext<TResult>();
            }

            return context as GateCallContext<TResult>;
        }

        private void EnqueueCallContext(IGateCallContext context)
        {
            var type = context.GetType();
            if (!this.CallContextCache.TryGetValue(type, out ConcurrentQueue<IGateCallContext> queue))
            {
                queue = new ConcurrentQueue<IGateCallContext>();
                this.CallContextCache.AddOrUpdate(type, queue, (k, v) => queue);
            }
            context.ClearContext();
            queue.Enqueue(context);
        }

        private void WriteHeadBytes(MemoryStream stream, ushort length, ushort command, int rpcId)
        {
            this.WriteTo(stream, length);
            this.WriteTo(stream, command);
            this.WriteTo(stream, rpcId);
        }

        private void WriteLength(MemoryStream stream, ushort length)
        {
            const int count = sizeof(ushort);
            for (var i = 0; i < count; i++)
            {
                var d = (byte)(length >> i * 8);
                stream.GetBuffer()[i] = d;
            }
        }

        private void WriteTo(MemoryStream stream, ushort value)
        {
            const int count = sizeof(ushort);
            for (var i = 0; i < count; i++)
            {
                var d = (byte)(value >> i * 8);
                stream.WriteByte(d);
            }
        }

        private void WriteTo(MemoryStream stream, int value)
        {
            const int count = sizeof(int);
            for (var i = 0; i < count; i++)
            {
                var d = (byte)(value >> i * 8);
                stream.WriteByte(d);
            }
        }

        private int incId;
        private int CreateIncId()
        {
            Interlocked.CompareExchange(ref this.incId, 0, int.MaxValue);
            Interlocked.Increment(ref incId);
            return incId;
        }
    }
}
